package fun.easycode.datastream;

import fun.easycode.datastream.util.TableUtil;
import fun.easycode.jointblock.core.JointBlockMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.rocketmq.client.exception.MQClientException;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Framework上下文
 * @author xuzhen97
 */
@Slf4j
public class DataContext implements ApplicationContextAware {

    private static final String lockNode = "/data-stream/locks";

    private static ApplicationContext CONTEXT;

    private static final Map<String, IDataProcessor<?>> MARK_PROCESSOR_MAP = new HashMap<>();
    private static final Map<Class<?>, IDataProcessor<?>> CLASS_PROCESSOR_MAP = new HashMap<>();
    private static final Map<String, List<IDataProcessor<?>>> TABLE_NAME_PROCESSOR_LIST_MAP = new HashMap<>();

    private static final Map<String, IDataTransformer<?,?>> MARK_TRANSFORM_MAP = new HashMap<>();
    private static final Map<Class<?>, IDataTransformer<?,?>> CLASS_TRANSFORM_MAP = new HashMap<>();
    private static final Map<String, List<IDataTransformer<?,?>>> TABLE_NAME_TRANSFORM_LIST_MAP = new HashMap<>();

    private static final List<String> OUTPUT_TABLE_NAME_LIST = new ArrayList<>();

    private final DataQuantityManager dataQuantityManager;
    private final CanalAdapter canalAdapter;
    private final CuratorFramework curatorFramework;
    private final DataCompleteStartConsumer dataCompleteStartConsumer;
    private final DataCompleteStartProducer dataCompleteStartProducer;

    public DataContext(DataQuantityManager dataQuantityManager
            , CanalAdapter canalAdapter
            , CuratorFramework curatorFramework
            , DataCompleteStartConsumer dataCompleteStartConsumer
            , DataCompleteStartProducer dataCompleteStartProducer) {
        this.dataQuantityManager = dataQuantityManager;
        this.canalAdapter = canalAdapter;
        this.curatorFramework = curatorFramework;
        this.dataCompleteStartConsumer = dataCompleteStartConsumer;
        this.dataCompleteStartProducer = dataCompleteStartProducer;
    }

    /**
     * 获取mapper
     * @param clazz mapper类
     * @return mapper
     * @param <I> 输入数据类型
     * @param <T> mapper类型
     */
    public static <I, T extends JointBlockMapper<I>> JointBlockMapper<I> getMapper(Class<T> clazz){
        return CONTEXT.getBean(clazz);
    }

    /**
     * 查看输出表的位置
     * @param tableName 表名
     * @return 输出表的位置
     */
    public static int indexOf(String tableName){
        return OUTPUT_TABLE_NAME_LIST.indexOf(tableName);
    }

    /**
     * 获取数据处理器
     * @param mark 数据处理器mark
     * @return 数据处理器
     */
    public static IDataProcessor<?> getProcessor(String mark){
        return MARK_PROCESSOR_MAP.get(mark);
    }

    /**
     * 获取数据处理器
     * @param clazz 数据处理器类Class
     * @return 数据处理器
     */
    public static IDataProcessor<?> getProcessor(Class<? extends IDataProcessor<?>> clazz){
        return CLASS_PROCESSOR_MAP.get(clazz);
    }

    /**
     * 获取数据处理器列表
     * @param tableName 表名
     * @return 数据处理器列表
     */
    public static List<IDataProcessor<?>> getProcessors(String tableName){
        return TABLE_NAME_PROCESSOR_LIST_MAP.get(tableName);
    }

    /**
     * 获取数据转换器
     * @param mark 数据转换器mark
     * @return 数据转换器
     */
    public static IDataTransformer<?,?> getTransform(String mark){
        return MARK_TRANSFORM_MAP.get(mark);
    }

    /**
     * 获取数据转换器
     * @param clazz 数据转换器类Class
     * @return 数据转换器
     */
    public static IDataTransformer<?,?> getTransform(Class<?> clazz){
        return CLASS_TRANSFORM_MAP.get(clazz);
    }

    /**
     * 获取数据转换器列表
     * @param tableName 表名
     * @return 数据转换器列表
     */
    public static List<IDataTransformer<?,?>> getTransforms(String tableName){
        return TABLE_NAME_TRANSFORM_LIST_MAP.get(tableName);
    }

    public static String getTableName(IDataProcessor<?> processor) {
        return TableUtil.getTableName(processor.getParam().getInputClass());
    }

    public static String getTableName(IDataTransformer<?,?> transform) {
        return TableUtil.getTableName(transform.getParam().getInputClass());
    }

    /**
     * 根据契约函数锁住执行逻辑
     *  zookeeper实现分布式锁
     * @param contract 契约
     * @param runnable 执行逻辑
     */
    public static void tryLock(IDataContract contract, Runnable runnable){

        CuratorFramework curatorFramework = CONTEXT.getBean(CuratorFramework.class);

        String lockPath = lockNode + "/" + contract.getMark();

        InterProcessLock lock = new InterProcessMutex(curatorFramework, lockPath);
        try {
            lock.acquire();
            log.info("lock success, lockPath: {}", lockPath);
            runnable.run();
            lock.release();
            log.info("release success, lockPath: {}", lockPath);
        }catch (Exception e){
            try {
                lock.release();
                log.info("release success, lockPath: {}", lockPath);
            } catch (Exception ex) {
                throw new RuntimeException(ex);
            }
            throw new RuntimeException(e);
        }
    }

    /**
     * 初始化
     * @throws MQClientException MQ客户端异常
     */
    @PostConstruct
    public void init() throws MQClientException {
        log.info("QuickContext init");
        loadProcessor(CONTEXT);
        log.info("load processor success");
        loadTransform(CONTEXT);
        log.info("load transform success");
        loadOutputTableNames();
        log.info("load output table name success");
        // 等待所有处理器初始化完成，再启动canal adapter
        canalAdapter.start();
        log.info("canal adapter start success");
        curatorFramework.start();
        log.info("curator framework start success");
        dataCompleteStartConsumer.start();
        log.info("data complete start consumer start success");
        dataCompleteStartProducer.start();
        log.info("data complete start producer start success");
        log.info("QuickContext init success");
    }

    /**
     * 销毁
     */
    @PreDestroy
    public void destroy() {
        log.info("QuickContext destroy");
        canalAdapter.stop();
        log.info("QuickContext destroy success");
    }

    @Override
    public void setApplicationContext(@NotNull ApplicationContext applicationContext) throws BeansException {
        CONTEXT = applicationContext;
    }

    /**
     * 加载输出表名
     */
    private void loadOutputTableNames(){
        List<String> outputTableNames = MARK_PROCESSOR_MAP.values().stream()
                .map(processor -> processor.getParam().getOutputTableName())
                // 字典排序
                .sorted()
                .collect(Collectors.toList());

        OUTPUT_TABLE_NAME_LIST.addAll(outputTableNames);
    }

    /**
     * 加载处理器
     * @param applicationContext Spring上下文
     */
    private void loadProcessor(ApplicationContext applicationContext) throws MQClientException {
        Map<String, IDataProcessor> beansOfType = applicationContext.getBeansOfType(IDataProcessor.class);
        for (IDataProcessor<?> dataProcessor : beansOfType.values()) {

            dataQuantityManager.register(dataProcessor);

            // 处理表名为key相关的处理器map
            String tableName = TableUtil.getTableName(dataProcessor.getParam().getInputClass());

            List<IDataProcessor<?>> iDataProcessors = TABLE_NAME_PROCESSOR_LIST_MAP.computeIfAbsent(tableName, k -> new ArrayList<>());

            iDataProcessors.add(dataProcessor);

            // 处理类名为key相关的处理器map
            MARK_PROCESSOR_MAP.put(dataProcessor.getMark(), dataProcessor);

            // 处理类为key相关的处理器map
            CLASS_PROCESSOR_MAP.put(dataProcessor.getClass(), dataProcessor);
        }
    }

    /**
     * 加载转换器
     * @param applicationContext Spring上下文
     */
    private void loadTransform(ApplicationContext applicationContext) throws MQClientException {
        Map<String, IDataTransformer> beansOfType = applicationContext.getBeansOfType(IDataTransformer.class);
        for (IDataTransformer<?, ?> dataTransform : beansOfType.values()) {

            dataQuantityManager.register(dataTransform);

            // 处理表名为key相关的转换器map
            String tableName = TableUtil.getTableName(dataTransform.getParam().getInputClass());

            List<IDataTransformer<?, ?>> iDataTransformers = TABLE_NAME_TRANSFORM_LIST_MAP.computeIfAbsent(tableName, k -> new ArrayList<>());

            iDataTransformers.add(dataTransform);

            // 处理名称为key相关的转换器map
            MARK_TRANSFORM_MAP.put(dataTransform.getMark(), dataTransform);

            // 处理类为key相关的转换器map
            CLASS_TRANSFORM_MAP.put(dataTransform.getClass(), dataTransform);
        }
    }
}
